// Copyright (C) 2020 THL A29 Limited, a Tencent company.  All rights reserved.
// Please refer to the license text that comes with this tendis open source
// project for additional information.

#include <algorithm>
#include <numeric>
#include <string>
#include <utility>
#include <unordered_set>
#include <limits>
#include <list>
#include "tendisplus/commands/dump.h"
#include "tendisplus/commands/command.h"
#include "tendisplus/storage/skiplist.h"
#include "tendisplus/utils/string.h"
#include "tendisplus/utils/redis_port.h"
#include "tendisplus/storage/record.h"

namespace tendisplus {
template <typename T>
size_t easyCopy(std::vector<byte>* buf, size_t* pos, T element) {
  if (*pos + sizeof(T) > buf->size()) {
    buf->resize(*pos + sizeof(T));
  }
  auto* ptr = reinterpret_cast<byte*>(&element);
  std::copy(ptr, (ptr + sizeof(T)), buf->begin() + *pos);
  *pos += sizeof(T);
  return sizeof(T);
}

template <typename T>
size_t easyCopy(std::vector<byte>* buf,
                size_t* pos,
                const T* array,
                size_t len) {
  if (*pos + len > buf->size()) {
    buf->resize(*pos + len * sizeof(T));
  }
  auto* ptr = const_cast<byte*>(reinterpret_cast<const byte*>(array));
  std::copy(ptr, (ptr + len * sizeof(T)), buf->begin() + *pos);
  *pos += len * sizeof(T);
  return len * sizeof(T);
}

uint8_t decodeType(RecordType type) {
  uint8_t typeMask(0);
  switch (type) {
    case RecordType::RT_HASH_META:
      typeMask = 4 << 4;
      break;
    case RecordType::RT_LIST_META:
      typeMask = 1 << 4;
      break;
    case RecordType::RT_SET_META:
      typeMask = 2 << 4;
      break;
    case RecordType::RT_ZSET_META:
      typeMask = 3 << 4;
      break;
    case RecordType::RT_KV:
      typeMask = 0 << 4;
      break;
    default:
      LOG(ERROR) << "get invalid record type" << rt2Char(type)
                 << "in iteration";
      INVARIANT(0);
  }

  return typeMask;
}

template <typename T>
size_t easyCopy(T* dest, const std::string& buf, size_t* pos) {
  if (buf.size() < *pos) {
    return 0;
  }
  byte* ptr = reinterpret_cast<byte*>(dest);
  size_t end = *pos + sizeof(T);
  std::copy(&buf[*pos], &buf[end], ptr);
  *pos += sizeof(T);
  return sizeof(T);
}

// dump
// Base class
Serializer::Serializer(Session* sess,
                       const std::string& key,
                       DumpType type,
                       RecordValue&& rv)
  : _begin(0), _end(0), _sess(sess), _key(key), _type(type), _pos(0), _rv(rv) {}

Expected<size_t> Serializer::saveObjectType(std::vector<byte>* payload,
                                            size_t* pos,
                                            DumpType type) {
  return easyCopy(payload, pos, type);
}

Expected<size_t> Serializer::saveLen(std::vector<byte>* payload,
                                     size_t* pos,
                                     size_t len) {
  byte header[2];

  if (len < (1 << 6)) {
    header[0] = (len & 0xff) | (RDB_6BITLEN << 6);
    return easyCopy(payload, pos, header, 1);
  } else if (len < (1 << 14)) {
    header[0] = ((len >> 8) & 0xff) | (RDB_14BITLEN << 6);
    header[1] = len & 0xff;
    return easyCopy(payload, pos, header, 2);
  } else if (len <= UINT32_MAX) {
    header[0] = RDB_32BITLEN;
    if (1 != easyCopy(payload, pos, header, 1)) {
      return {ErrorCodes::ERR_INTERNAL, "copy len to buffer failed"};
    }
    uint32_t len32 = htonl(len);
    return (1 + easyCopy(payload, pos, len32));
  } else {
    header[0] = RDB_64BITLEN;
    if (1 != easyCopy(payload, pos, header, 1)) {
      return {ErrorCodes::ERR_INTERNAL, "copy len to buffer failed"};
    }
    uint64_t len64 = redis_port::htonll(static_cast<uint64_t>(len));
    return (1 + easyCopy(payload, pos, len64));
  }
}

size_t Serializer::saveString(std::vector<byte>* payload,
                              size_t* pos,
                              const std::string& str) {
  size_t written(0);
  auto wr = Serializer::saveLen(payload, pos, str.size());
  INVARIANT(wr.value() > 0);
  written += wr.value();
  written += easyCopy(payload, pos, str.c_str(), str.size());
  return written;
}

Expected<std::vector<byte>> Serializer::dump(bool prefixVer) {
  std::vector<byte> payload;

  if (prefixVer) {
    auto revision = _rv.getVersionEP();
    /* special case for keys that directly generated by tendisplus iteself
     */
    if (revision == UINT64_MAX) {
      revision = 0;
      LOG(WARNING) << "redis send dumpx command to an"
                   << "tendis-generated key: " << _key;
    }
    Serializer::saveLen(&payload, &_pos, revision);
  }
  Serializer::saveObjectType(&payload, &_pos, _type);
  INVARIANT_D(_pos > 0);

  auto expRet = dumpObject(&payload);
  if (!expRet.ok()) {
    return expRet.status();
  }

  /* Write the footer, this is how it looks like:
   * ----------------+---------------------+---------------+
   * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
   * ----------------+---------------------+---------------+
   * RDB version and CRC are both in little endian.
   */
  byte version[2];
  version[0] = RDB_VERSION & 0xff;
  version[1] = (RDB_VERSION >> 8) & 0xff;
  easyCopy(&payload, &_pos, version, 2);

  uint64_t crc = redis_port::crc64(0, &payload[_begin], _pos - _begin);
  easyCopy(&payload, &_pos, crc);
  _end = _pos;
  return payload;
}

// Command who can only see base class Serializer.
class DumpCommand : public Command {
 public:
  DumpCommand() : Command("dump", "r") {}

  ssize_t arity() const {
    return 2;
  }

  int32_t firstkey() const {
    return 1;
  }
  int32_t lastkey() const {
    return 1;
  }

  int32_t keystep() const {
    return 1;
  }

  Expected<std::string> run(Session* sess) final {
    const std::string& key = sess->getArgs()[1];
    std::vector<byte> buf;

    auto server = sess->getServerEntry();
    auto expdb =
      server->getSegmentMgr()->getDbWithKeyLock(sess, key, Command::RdLock());
    auto exps = getSerializer(sess, key);
    if (!exps.ok()) {
      if (exps.status().code() == ErrorCodes::ERR_EXPIRED ||
          exps.status().code() == ErrorCodes::ERR_NOTFOUND) {
        return Command::fmtNull();
      }
      return exps.status();
    }

    auto expBuf = exps.value()->dump();
    if (!expBuf.ok()) {
      return expBuf.status();
    }
    buf = std::move(expBuf.value());

    // expect our g++ compiler will do cow in this ctor
    std::string output(buf.begin() + exps.value()->_begin,
                       buf.begin() + exps.value()->_end);
    return Command::fmtBulk(output);
  }
} dumpCommand;

class DumpXCommand : public Command {
 public:
  DumpXCommand() : Command("dumpx", "rs") {}

  ssize_t arity() const {
    return -3;
  }

  int32_t firstkey() const {
    return 2;
  }
  int32_t lastkey() const {
    return -1;
  }

  int32_t keystep() const {
    return 2;
  }

  virtual std::string fmtRestorexSubCmd(const std::string& dbid,
                                        const std::string& key,
                                        uint64_t ttl,
                                        std::vector<byte>::const_iterator begin,
                                        std::vector<byte>::const_iterator end) {
    std::stringstream ss;
    if (ttl != std::numeric_limits<uint64_t>::max()) {
      Command::fmtMultiBulkLen(ss, 5);
      Command::fmtBulk(ss, "RESTOREEX");
    } else {
      Command::fmtMultiBulkLen(ss, 4);
      Command::fmtBulk(ss, "RESTOREX");
    }

    Command::fmtBulk(ss, dbid);
    Command::fmtBulk(ss, key);
    if (ttl != std::numeric_limits<uint64_t>::max()) {
      Command::fmtBulk(ss, std::to_string(ttl));
    }
    Command::fmtBulk(ss, std::string(begin, end));
    return ss.str();
  }

  std::string fmtDumpxError(const std::string& dbid,
                            const std::string& key,
                            const Status& err) {
    INVARIANT(!err.ok());
    std::stringstream ss;
    Command::fmtMultiBulkLen(ss, 4);
    Command::fmtBulk(ss, "DUMPXERROR");
    Command::fmtBulk(ss, dbid);
    Command::fmtBulk(ss, key);
    Command::fmtBulk(ss, err.toString());
    return ss.str();
  }

  std::vector<int> getKeysFromCommand(const std::vector<std::string>& argv) {
    std::vector<int> index((argv.size() - 1) / 2);
    std::generate(
      index.begin(), index.end(), [n = 0]() mutable { return n += 2; });
    return index;
  }

  // args: dumpx [dbid key] withttl
  Expected<std::string> run(Session* sess) final {
    const auto& args = sess->getArgs();
    auto server = sess->getServerEntry();
    bool withttl = (args.size() - 1) % 2;

    auto index = getKeysFromCommand(args);
    auto locklist = server->getSegmentMgr()->getAllKeysLocked(
      sess, args, index, Command::RdLock());
    if (!locklist.ok()) {
      return locklist.status();
    }
    std::stringstream ss;
    std::vector<std::string> errorlist;
    std::vector<std::string> bufferlist;
    for (const auto& i : index) {
      auto expDbid = tendisplus::stoul(args[i - 1]);
      if (!expDbid.ok()) {
        errorlist.emplace_back(
          std::move(fmtDumpxError(args[i - 1], args[i], expDbid.status())));
        continue;
      }
      auto dbid = static_cast<uint32_t>(expDbid.value());
      if (sess->getCtx()->getDbId() != dbid) {
        sess->getCtx()->setDbId(dbid);
      }
      auto expdb = server->getSegmentMgr()->getDbHasLocked(sess, args[i]);
      if (!expdb.ok()) {
        errorlist.emplace_back(
          std::move(fmtDumpxError(args[i - 1], args[i], expdb.status())));
        continue;
      }
      auto exps = getSerializer(sess, args[i]);
      if (!exps.ok()) {
        if (exps.status().code() != ErrorCodes::ERR_EXPIRED ||
            exps.status().code() != ErrorCodes::ERR_NOTFOUND) {
          errorlist.emplace_back(
            std::move(fmtDumpxError(args[i - 1], args[i], exps.status())));
          continue;
        }
      }

      auto expBuf = exps.value()->dump(true);
      if (!expBuf.ok()) {
        errorlist.emplace_back(
          std::move(fmtDumpxError(args[i - 1], args[i], expBuf.status())));
        continue;
      }

      uint64_t ttl =
        withttl ? exps.value()->getTTL() : std::numeric_limits<uint64_t>::max();
      bufferlist.emplace_back(std::move(
        fmtRestorexSubCmd(args[i - 1],
                          args[i],
                          ttl,
                          expBuf.value().begin() + exps.value()->_begin,
                          expBuf.value().begin() + exps.value()->_end)));
    }
    /* we return keys can't be restored first */
    for (const auto& str : errorlist) {
      auto s = sess->setResponse(str);
      if (!s.ok()) {
        return s;
      }
    }

    for (const auto& str : bufferlist) {
      auto s = sess->setResponse(str);
      if (!s.ok()) {
        return s;
      }
    }
    return std::string();
  }
} dumpxCommand;

// derived classes, each of them should handle the dump of different object.
class KvSerializer : public Serializer {
 public:
  explicit KvSerializer(Session* sess, const std::string& key, RecordValue&& rv)
    : Serializer(
        sess, key, DumpType::RDB_TYPE_STRING, std::forward<RecordValue>(rv)) {}

  Expected<size_t> dumpObject(std::vector<byte>* payload) {
    Serializer::saveString(payload, &_pos, _rv.getValue());
    _begin = 0;
    return _pos - _begin;
  }
};

class ListSerializer : public Serializer {
 private:
  Expected<uint32_t> formatZiplist(std::vector<byte>* payload,
                                   size_t* pos,
                                   const std::vector<std::string>& zl,
                                   uint32_t byteSz) {
    std::vector<byte> ziplist;
    ziplist.reserve(byteSz);
    size_t tmpPos(0);
    size_t zlInitPos(0);
    tmpPos += 8;
    uint32_t zlbytes(10);
    uint32_t prevlen(0);
    uint16_t zllen = zl.size();
    easyCopy(&ziplist, &tmpPos, zllen);
    for (size_t i = 0; i < zl.size(); i++) {
      size_t written(0);
      if (prevlen >= 254) {
        written +=
          easyCopy(&ziplist, &tmpPos, static_cast<unsigned char>(0xfe));
        written += easyCopy(&ziplist, &tmpPos, prevlen);
      } else {
        written +=
          easyCopy(&ziplist, &tmpPos, static_cast<unsigned char>(prevlen));
      }
      written += saveString(&ziplist, &tmpPos, zl[i]);

      prevlen = written;
      zlbytes += written;
    }
    zlbytes += easyCopy(&ziplist, &tmpPos, static_cast<unsigned char>(0xff));
    uint32_t zltail(zlbytes - 1 - prevlen);
    easyCopy(&ziplist, &zlInitPos, zlbytes);
    easyCopy(&ziplist, &zlInitPos, zltail);

    size_t written(0);
    auto wr = Serializer::saveLen(payload, pos, ziplist.size());
    INVARIANT(wr.value() > 0);
    written += wr.value();
    written += easyCopy(payload, pos, ziplist.data(), ziplist.size());
    return written;
  }

 public:
  explicit ListSerializer(Session* sess,
                          const std::string& key,
                          RecordValue&& rv)
    : Serializer(sess,
                 key,
                 DumpType::RDB_TYPE_QUICKLIST,
                 std::forward<RecordValue>(rv)) {}

  Expected<size_t> dumpObject(std::vector<byte>* payload) {
    size_t qlbytes(0);
    size_t notAligned = _pos;
    size_t qlEnd = notAligned + 9;

    {
      // make room for quicklist length first,
      // remember to move first several bytes to align after.
      payload->resize(payload->size() + 9);
      _pos += 9;
    }

    auto expListMeta = ListMetaValue::decode(_rv.getValue());
    if (!expListMeta.ok()) {
      return expListMeta.status();
    }
    uint64_t tail = expListMeta.value().getTail();
    uint64_t head = expListMeta.value().getHead();
    uint64_t len = tail - head;
    INVARIANT_D(len > 0);
    if (len <= 0) {
      return {ErrorCodes::ERR_INTERNAL, "invalid list"};
    }

    auto server = _sess->getServerEntry();
    auto expdb = server->getSegmentMgr()->getDbHasLocked(_sess, _key);
    if (!expdb.ok()) {
      return expdb.status();
    }
    PStore kvstore = expdb.value().store;

    auto ptxn = _sess->getCtx()->createTransaction(kvstore);
    if (!ptxn.ok()) {
      return ptxn.status();
    }

    /* in this loop we should emulate to build a quicklist(or to say, many
     * ziplists)
     * then compress it using lzf(not implemented), or just write raw to
     * buffer, both can work.*/

    uint32_t byteSz(0);
    uint32_t lenSz(0);
    std::vector<std::string> ziplist;
    size_t zlCnt(0);
    for (size_t i = head; i != tail; i++) {
      RecordKey nodeKey(expdb.value().chunkId,
                        _sess->getCtx()->getDbId(),
                        RecordType::RT_LIST_ELE,
                        _key,
                        std::to_string(i));
      auto expNodeVal = kvstore->getKV(nodeKey, ptxn.value());
      if (!expNodeVal.ok()) {
        return expNodeVal.status();
      }
      byteSz += expNodeVal.value().getValue().size();
      lenSz++;
      ziplist.emplace_back(std::move(expNodeVal.value().getValue()));
      if ((byteSz > ZLBYTE_LIMIT || lenSz > ZLLEN_LIMIT) || i == tail - 1) {
        ++zlCnt;
        auto ezlBytes = formatZiplist(payload, &_pos, ziplist, byteSz);
        if (!ezlBytes.ok()) {
          return ezlBytes.status();
        }
        qlbytes += ezlBytes.value();
        ziplist.clear();
        byteSz = 0;
        lenSz = 0;
      }
    }

    auto expQlUsed = saveLen(payload, &notAligned, zlCnt);
    if (!expQlUsed.ok()) {
      return expQlUsed.status();
    }
    if (expQlUsed.value() < 9) {
      std::copy_backward(payload->begin(),
                         payload->begin() + notAligned,
                         payload->begin() + qlEnd);
    }
    _begin = 9 - expQlUsed.value();
    _end = payload->size() - _begin;
    return qlbytes + expQlUsed.value();
  }
};

class SetSerializer : public Serializer {
 public:
  explicit SetSerializer(Session* sess,
                         const std::string& key,
                         RecordValue&& rv)
    : Serializer(
        sess, key, DumpType::RDB_TYPE_SET, std::forward<RecordValue>(rv)) {}

  Expected<size_t> dumpObject(std::vector<byte>* payload) {
    Expected<SetMetaValue> expMeta = SetMetaValue::decode(_rv.getValue());
    size_t len = expMeta.value().getCount();
    INVARIANT_D(len > 0);
    if (len <= 0) {
      return {ErrorCodes::ERR_INTERNAL, "invalid set"};
    }

    auto expwr = saveLen(payload, &_pos, len);
    if (!expwr.ok()) {
      return expwr.status();
    }

    auto server = _sess->getServerEntry();
    auto expdb = server->getSegmentMgr()->getDbHasLocked(_sess, _key);
    if (!expdb.ok()) {
      return expdb.status();
    }
    PStore kvstore = expdb.value().store;
    auto ptxn = _sess->getCtx()->createTransaction(kvstore);
    if (!ptxn.ok()) {
      return ptxn.status();
    }

    auto cursor = ptxn.value()->createDataCursor();
    RecordKey fakeRk(expdb.value().chunkId,
                     _sess->getCtx()->getDbId(),
                     RecordType::RT_SET_ELE,
                     _key,
                     "");
    cursor->seek(fakeRk.prefixPk());
    while (true) {
      Expected<Record> eRcd = cursor->next();
      if (eRcd.status().code() == ErrorCodes::ERR_EXHAUST) {
        break;
      }
      if (!eRcd.ok()) {
        return eRcd.status();
      }
      Record& rcd = eRcd.value();
      const RecordKey& rcdKey = rcd.getRecordKey();
      if (rcdKey.prefixPk() != fakeRk.prefixPk()) {
        break;
      }

      const std::string& subk = rcdKey.getSecondaryKey();
      Serializer::saveString(payload, &_pos, subk);
    }

    _begin = 0;
    return _pos - _begin;
  }
};

class ZsetSerializer : public Serializer {
 public:
  explicit ZsetSerializer(Session* sess,
                          const std::string& key,
                          RecordValue&& rv)
    : Serializer(
        sess, key, DumpType::RDB_TYPE_ZSET, std::forward<RecordValue>(rv)) {}

  Expected<size_t> dumpObject(std::vector<byte>* payload) {
    auto server = _sess->getServerEntry();
    auto expdb = server->getSegmentMgr()->getDbHasLocked(_sess, _key);
    if (!expdb.ok()) {
      return expdb.status();
    }
    PStore kvstore = expdb.value().store;
    auto ptxn = _sess->getCtx()->createTransaction(kvstore);
    if (!ptxn.ok()) {
      return ptxn.status();
    }

    auto eMeta = ZSlMetaValue::decode(_rv.getValue());
    if (!eMeta.ok()) {
      return eMeta.status();
    }
    ZSlMetaValue meta = eMeta.value();
    SkipList zsl(
      expdb.value().chunkId, _sess->getCtx()->getDbId(), _key, meta, kvstore);

    auto expwr = saveLen(payload, &_pos, zsl.getCount() - 1);
    if (!expwr.ok()) {
      return expwr.status();
    }

    auto rev = zsl.scanByRank(0, zsl.getCount() - 1, true, ptxn.value());
    if (!rev.ok()) {
      return rev.status();
    }
    for (auto& ele : rev.value()) {
      Serializer::saveString(
        payload, &_pos, std::forward<std::string>(ele.second));
      // save binary double score
      double score = ele.first;
      easyCopy(payload, &_pos, score);
    }
    _begin = 0;
    return _pos - _begin;
  }
};

class HashSerializer : public Serializer {
 public:
  explicit HashSerializer(Session* sess,
                          const std::string& key,
                          RecordValue&& rv)
    : Serializer(
        sess, key, DumpType::RDB_TYPE_HASH, std::forward<RecordValue>(rv)) {}

  Expected<size_t> dumpObject(std::vector<byte>* payload) {
    Expected<HashMetaValue> expHashMeta = HashMetaValue::decode(_rv.getValue());
    if (!expHashMeta.ok()) {
      return expHashMeta.status();
    }
    auto expwr = saveLen(payload, &_pos, expHashMeta.value().getCount());
    if (!expwr.ok()) {
      return expwr.status();
    }

    auto server = _sess->getServerEntry();
    auto expdb = server->getSegmentMgr()->getDbHasLocked(_sess, _key);
    if (!expdb.ok()) {
      return expdb.status();
    }

    PStore kvstore = expdb.value().store;
    auto ptxn = _sess->getCtx()->createTransaction(kvstore);
    if (!ptxn.ok()) {
      return ptxn.status();
    }

    RecordKey fakeRk(expdb.value().chunkId,
                     _sess->getCtx()->getDbId(),
                     RecordType::RT_HASH_ELE,
                     _key,
                     "");
    auto cursor = ptxn.value()->createDataCursor();
    cursor->seek(fakeRk.prefixPk());
    while (true) {
      Expected<Record> expRcd = cursor->next();
      if (expRcd.status().code() == ErrorCodes::ERR_EXHAUST) {
        break;
      }
      if (!expRcd.ok()) {
        return expRcd.status();
      }
      if (expRcd.value().getRecordKey().prefixPk() != fakeRk.prefixPk()) {
        break;
      }
      const std::string& field =
        expRcd.value().getRecordKey().getSecondaryKey();
      const std::string& value = expRcd.value().getRecordValue().getValue();
      Serializer::saveString(payload, &_pos, field);
      Serializer::saveString(payload, &_pos, value);
    }
    _begin = 0;
    return _pos - _begin;
  }
};

// outlier function
Expected<std::unique_ptr<Serializer>> getSerializer(Session* sess,
                                                    const std::string& key) {
  Expected<RecordValue> rv =
    Command::expireKeyIfNeeded(sess, key, RecordType::RT_DATA_META);
  if (!rv.ok()) {
    return rv.status();
  }

  std::unique_ptr<Serializer> ptr;
  auto type = rv.value().getRecordType();
  switch (type) {
    case RecordType::RT_KV:
      ptr = std::move(std::unique_ptr<Serializer>(
        new KvSerializer(sess, key, std::move(rv.value()))));
      break;
    case RecordType::RT_LIST_META:
      ptr = std::move(std::unique_ptr<Serializer>(
        new ListSerializer(sess, key, std::move(rv.value()))));
      break;
    case RecordType::RT_HASH_META:
      ptr = std::move(std::unique_ptr<Serializer>(
        new HashSerializer(sess, key, std::move(rv.value()))));
      break;
    case RecordType::RT_SET_META:
      ptr = std::move(std::unique_ptr<Serializer>(
        new SetSerializer(sess, key, std::move(rv.value()))));
      break;
    case RecordType::RT_ZSET_META:
      ptr = std::move(std::unique_ptr<Serializer>(
        new ZsetSerializer(sess, key, std::move(rv.value()))));
      break;
    default:
      return {ErrorCodes::ERR_WRONG_TYPE, "type can not be dumped"};
  }

  return std::move(ptr);
}

// restore
Deserializer::Deserializer(Session* sess,
                           const std::string& payload,
                           const std::string& key,
                           const uint64_t ttl)
  : _sess(sess), _payload(payload), _key(key), _ttl(ttl), _pos(1) {}

Expected<DumpType> Deserializer::loadObjectType(const std::string& payload,
                                                size_t&& pos) {
  uint8_t t;
  easyCopy(&t, payload, &pos);
  return static_cast<DumpType>(t);
}

Expected<size_t> Deserializer::loadLen(const std::string& payload,
                                       size_t* pos,
                                       bool* isencoded) {
  byte buf[2];
  size_t ret;
  INVARIANT(easyCopy(&buf[0], payload, pos) == 1);
  uint8_t encType = static_cast<uint8_t>((buf[0] & 0xC0) >> 6);
  if (isencoded) {
    *isencoded = false;
  }
  /* Support ENCODING_INT and lzf(compressed) */
  if (encType == RDB_ENCVAL) {
    ret = buf[0] & 0x3F;
    if (isencoded) {
      *isencoded = true;
    }
  } else if (encType == RDB_6BITLEN) {
    ret = buf[0] & 0x3F;
  } else if (encType == RDB_14BITLEN) {
    INVARIANT(easyCopy(&buf[1], payload, pos) == 1);
    ret = ((buf[0] & 0x3F) << 8) | buf[1];
  } else if (buf[0] == RDB_32BITLEN) {
    uint32_t len32;
    INVARIANT(easyCopy(&len32, payload, pos) == 4);
    ret = ntohl(len32);
  } else if (buf[0] == RDB_64BITLEN) {
    uint64_t len64;
    INVARIANT(easyCopy(&len64, payload, pos) == 8);
    ret = redis_port::ntohll(len64);
  } else {
    return {ErrorCodes::ERR_INTERNAL, "Unknown length encoding"};
  }
  return ret;
}

std::string Deserializer::loadString(const std::string& payload, size_t* pos) {
  bool isencoded(false);
  auto expLen = Deserializer::loadLen(payload, pos, &isencoded);
  if (!expLen.ok()) {
    return std::string("");
  }
  size_t len = expLen.value();
  if (isencoded) {
    switch (static_cast<uint8_t>(len)) {
      case RDB_ENC_INT8:
      case RDB_ENC_INT16:
      case RDB_ENC_INT32: {
        /* transfer integer to string */
        auto eVal = loadIntegerString(payload, pos, len);
        if (!eVal.ok()) {
          return std::string();
        }
        return std::to_string(eVal.value());
        break;
      }
      case RDB_ENC_LZF: {
        auto expLzf = loadLzfString(payload, pos);
        if (!expLzf.ok()) {
          return std::string();
        }
        return expLzf.value();
        break;
      }
      default:
        LOG(INFO) << "Unknown encoding " << static_cast<int>(len);
    }
  }

  *pos += len;
  if (payload.begin() + *pos > payload.end()) {
    LOG(INFO) << "pos over limit";
    return std::string("");
  }
  return std::string(payload.begin() + *pos - len, payload.begin() + *pos);
}

Expected<int64_t> Deserializer::loadIntegerString(const std::string& payload,
                                                  size_t* pos,
                                                  uint8_t encType) {
  int64_t val;

  switch (encType) {
    case RDB_ENC_INT8: {
      int8_t enc = 0;
      easyCopy(&enc, payload, pos);
      val = static_cast<int64_t>(enc);
      break;
    }
    case RDB_ENC_INT16: {
      int16_t enc;
      easyCopy(&enc, payload, pos);
      val = static_cast<int64_t>(enc);
      break;
    }
    case RDB_ENC_INT32: {
      int32_t enc;
      easyCopy(&enc, payload, pos);
      val = static_cast<int64_t>(enc);
      break;
    }
    default:
      LOG(ERROR) << "Unknown ENCODING_INT type " << encType;
      return {ErrorCodes::ERR_PARSEPKT, "Unknown RDB integer encoding type"};
  }
  return val;
}

Expected<std::string> Deserializer::loadLzfString(const std::string& payload,
                                                  size_t* pos) {
  auto expClen = loadLen(payload, pos);
  RET_IF_ERR_EXPECTED(expClen);

  auto expLen = loadLen(payload, pos);
  RET_IF_ERR_EXPECTED(expLen);

  const auto lzfEnd = payload.begin() + *pos + expClen.value();
  if (lzfEnd > payload.cend()) {
    LOG(ERROR) << "Wrong lzf buffer length";
    return {ErrorCodes::ERR_PARSEOPT, "Wrong lzf buffer length"};
  }
  std::string lzfBuf(payload.begin() + *pos, lzfEnd);
  *pos += expClen.value();
  std::vector<char> outBuf(expLen.value(), '\0');

  if (redis_port::lzf_decompress(
        lzfBuf.c_str(), lzfBuf.size(), outBuf.data(), outBuf.size()) == 0) {
    LOG(ERROR) << "Invalid LZF";
    return {ErrorCodes::ERR_PARSEPKT, "Invalid LZF compressed string"};
  }

  return std::string(outBuf.begin(), outBuf.end());
}

class RestoreCommand : public Command {
 public:
  RestoreCommand() : Command("restore", "wm") {}

  ssize_t arity() const {
    return -4;
  }

  int32_t firstkey() const {
    return 1;
  }

  int32_t lastkey() const {
    return 1;
  }

  int32_t keystep() const {
    return 1;
  }

  static Status verifyDumpPayload(const std::string& payload) {
    uint8_t buf[2];
    uint64_t crc;

    size_t len = payload.size();
    if (len < 10) {
      return {ErrorCodes::ERR_INTERNAL, "len cannot be lt 10"};
    }
    size_t chkpos = len - 10;
    INVARIANT(easyCopy(&buf[0], payload, &chkpos) == 1 &&
              easyCopy(&buf[1], payload, &chkpos) == 1);

    uint16_t rdbver = (buf[1] << 8) | buf[0];
    if (rdbver > RDB_VERSION) {
      return {ErrorCodes::ERR_INTERNAL, "rdb version not match"};
    }

    crc = redis_port::crc64(
      0, reinterpret_cast<const byte*>(payload.c_str()), chkpos);
    if (memcmp(&crc, payload.c_str() + chkpos, 8) == 0) {
      return {ErrorCodes::ERR_OK, "OK"};
    }
    return {ErrorCodes::ERR_INTERNAL, "crc not match"};
  }

  Expected<std::string> run(Session* sess) final {
    const auto& args = sess->getArgs();
    const std::string& key = args[1];
    const std::string& sttl = args[2];
    const std::string& payload = args[3];
    bool replace(false);

    {
      for (size_t i = 4; i < args.size(); i++) {
        if (!::strcasecmp(args[i].c_str(), "replace")) {
          replace = true;
        } else {
          return {ErrorCodes::ERR_PARSEOPT, ""};
        }
      }
    }

    auto server = sess->getServerEntry();
    INVARIANT(server != nullptr);
    auto expdb = server->getSegmentMgr()->getDbWithKeyLock(
      sess, key, mgl::LockMode::LOCK_X);
    if (!expdb.ok()) {
      return expdb.status();
    }
    PStore kvstore = expdb.value().store;
    auto ptxn = sess->getCtx()->createTransaction(kvstore);
    if (!ptxn.ok()) {
      return ptxn.status();
    }

    // check if key exists
    Expected<RecordValue> rv =
      Command::expireKeyIfNeeded(sess, key, RecordType::RT_DATA_META);
    if (rv.status().code() != ErrorCodes::ERR_EXPIRED &&
        rv.status().code() != ErrorCodes::ERR_NOTFOUND) {
      if (!rv.ok()) {
        return rv.status();
      }
      if (replace) {
        Status s = delKey(sess, key, RecordType::RT_DATA_META, ptxn.value());
        if (!s.ok()) {
          return s;
        }
      } else {
        return Command::fmtBusyKey();
      }
    }

    Expected<int64_t> expttl = tendisplus::stoll(sttl);
    if (!expttl.ok()) {
      return expttl.status();
    }
    if (expttl.value() < 0) {
      return {ErrorCodes::ERR_PARSEPKT, "Invalid TTL value, must be >= 0"};
    }
    uint64_t ts = 0;
    if (expttl.value() != 0)
      ts = msSinceEpoch() + expttl.value();

    Status chk = RestoreCommand::verifyDumpPayload(payload);
    if (!chk.ok()) {
      return {ErrorCodes::ERR_PARSEPKT,
              "DUMP payload version or checksum are wrong"};
    }

    // do restore
    auto expds = getDeserializer(sess, payload, key, ts);
    if (!expds.ok()) {
      return expds.status();
    }

    Status res = expds.value()->restore(ptxn.value());
    if (!res.ok()) {
      return res;
    }
    auto eCmt = sess->getCtx()->commitTransaction(ptxn.value());
    if (!eCmt.ok()) {
      return eCmt.status();
    }
    return Command::fmtOK();
  }
} restoreCommand;

class KvDeserializer : public Deserializer {
 public:
  explicit KvDeserializer(Session* sess,
                          const std::string& payload,
                          const std::string& key,
                          const uint64_t ttl)
    : Deserializer(sess, payload, key, ttl) {}

  virtual Status restore(Transaction* txn) {
    auto ret = Deserializer::loadString(_payload, &_pos);
    auto server = _sess->getServerEntry();
    auto expdb = server->getSegmentMgr()->getDbHasLocked(_sess, _key);
    if (!expdb.ok()) {
      return expdb.status();
    }
    PStore kvstore = expdb.value().store;

    SessionCtx* pCtx = _sess->getCtx();
    INVARIANT(pCtx != nullptr);

    RecordKey rk(
      expdb.value().chunkId, pCtx->getDbId(), RecordType::RT_KV, _key, "");
    RecordValue rv(ret, RecordType::RT_KV, pCtx->getVersionEP(), _ttl);
    for (int32_t i = 0; i < Command::RETRY_CNT; ++i) {
      Status s = kvstore->setKV(rk, rv, txn);
      if (!s.ok()) {
        return s;
      }
      return {ErrorCodes::ERR_OK, "OK"};
    }

    return {ErrorCodes::ERR_INTERNAL, "not reachable"};
  }
};

class SetDeserializer : public Deserializer {
 public:
  explicit SetDeserializer(Session* sess,
                           const std::string& payload,
                           const std::string& key,
                           const uint64_t ttl)
    : Deserializer(sess, payload, key, ttl) {}

  virtual Status restore(Transaction* txn) {
    auto expLen = Deserializer::loadLen(_payload, &_pos);
    if (!expLen.ok()) {
      return expLen.status();
    }

    size_t len = expLen.value();
    // std::vector<std::string> set(2);

    auto server = _sess->getServerEntry();
    auto expdb = server->getSegmentMgr()->getDbHasLocked(_sess, _key);
    if (!expdb.ok()) {
      return expdb.status();
    }
    PStore kvstore = expdb.value().store;

    RecordKey metaRk(expdb.value().chunkId,
                     _sess->getCtx()->getDbId(),
                     RecordType::RT_SET_META,
                     _key,
                     "");
    SetMetaValue sm;

    for (size_t i = 0; i < len; i++) {
      std::string ele = loadString(_payload, &_pos);
      RecordKey rk(metaRk.getChunkId(),
                   metaRk.getDbId(),
                   RecordType::RT_SET_ELE,
                   metaRk.getPrimaryKey(),
                   std::move(ele));
      RecordValue rv("", RecordType::RT_SET_ELE, -1);
      Status s = kvstore->setKV(rk, rv, txn);
      if (!s.ok()) {
        return s;
      }
    }
    sm.setCount(len);
    Status s = kvstore->setKV(metaRk,
                              RecordValue(sm.encode(),
                                          RecordType::RT_SET_META,
                                          _sess->getCtx()->getVersionEP(),
                                          _ttl),
                              txn);
    if (!s.ok()) {
      return s;
    }
    return {ErrorCodes::ERR_OK, "OK"};
  }
};

class ZsetDeserializer : public Deserializer {
 public:
  explicit ZsetDeserializer(Session* sess,
                            const std::string& payload,
                            const std::string& key,
                            const uint64_t ttl)
    : Deserializer(sess, payload, key, ttl) {}

  virtual Status restore(Transaction* txn) {
    auto expLen = loadLen(_payload, &_pos);
    if (!expLen.ok()) {
      return expLen.status();
    }

    size_t len = expLen.value();
    std::map<std::string, double> scoreMap;
    while (len--) {
      std::string ele = loadString(_payload, &_pos);
      double score;
      INVARIANT(easyCopy(&score, _payload, &_pos) == 8);
      scoreMap[ele] = score;
    }
    auto server = _sess->getServerEntry();
    auto expdb = server->getSegmentMgr()->getDbHasLocked(_sess, _key);
    if (!expdb.ok()) {
      return expdb.status();
    }

    RecordKey rk(expdb.value().chunkId,
                 _sess->getCtx()->getDbId(),
                 RecordType::RT_ZSET_META,
                 _key,
                 "");
    PStore kvstore = expdb.value().store;
    // set ttl first

    Expected<RecordValue> eMeta = kvstore->getKV(rk, txn);
    if (!eMeta.ok() && eMeta.status().code() != ErrorCodes::ERR_NOTFOUND) {
      return eMeta.status();
    }
    INVARIANT_D(eMeta.status().code() == ErrorCodes::ERR_NOTFOUND);
    ZSlMetaValue meta(1, 1, 0);
    RecordValue rv(meta.encode(),
                   RecordType::RT_ZSET_META,
                   _sess->getCtx()->getVersionEP(),
                   _ttl);
    Status s = kvstore->setKV(rk, rv, txn);
    if (!s.ok()) {
      return s;
    }
    RecordKey headRk(rk.getChunkId(),
                     rk.getDbId(),
                     RecordType::RT_ZSET_S_ELE,
                     rk.getPrimaryKey(),
                     std::to_string(ZSlMetaValue::HEAD_ID));
    ZSlEleValue headVal;
    RecordValue headRv(headVal.encode(), RecordType::RT_ZSET_S_ELE, -1);
    s = kvstore->setKV(headRk, headRv, txn);
    if (!s.ok()) {
      return s;
    }

    for (int32_t i = 0; i < Command::RETRY_CNT; ++i) {
      // maybe very slow
      Expected<std::string> res =
        genericZadd(_sess, kvstore, rk, rv, scoreMap, ZADD_NX, txn);
      if (!res.ok()) {
        return res.status();
      }
      return {ErrorCodes::ERR_OK, "OK"};
    }

    return {ErrorCodes::ERR_INTERNAL, "not reachable"};
  }
};

class HashDeserializer : public Deserializer {
 public:
  explicit HashDeserializer(Session* sess,
                            const std::string& payload,
                            const std::string& key,
                            const uint64_t ttl)
    : Deserializer(sess, payload, key, ttl) {}

  virtual Status restore(Transaction* txn) {
    auto expLen = loadLen(_payload, &_pos);
    if (!expLen.ok()) {
      return expLen.status();
    }

    size_t len = expLen.value();
    auto server = _sess->getServerEntry();
    auto expdb = server->getSegmentMgr()->getDbHasLocked(_sess, _key);
    if (!expdb.ok()) {
      return expdb.status();
    }
    PStore kvstore = expdb.value().store;
    for (size_t i = 0; i < len; i++) {
      std::string field = loadString(_payload, &_pos);
      std::string value = loadString(_payload, &_pos);
      // need existence check ?
      RecordKey rk(expdb.value().chunkId,
                   _sess->getCtx()->getDbId(),
                   RecordType::RT_HASH_ELE,
                   _key,
                   field);
      RecordValue rv(value, RecordType::RT_HASH_ELE, -1);
      Status s = kvstore->setKV(rk, rv, txn);
      if (!s.ok()) {
        return s;
      }
    }

    RecordKey metaRk(expdb.value().chunkId,
                     _sess->getCtx()->getDbId(),
                     RecordType::RT_HASH_META,
                     _key,
                     "");
    HashMetaValue hashMeta;
    hashMeta.setCount(len);
    RecordValue metaRv(std::move(hashMeta.encode()),
                       RecordType::RT_HASH_META,
                       _sess->getCtx()->getVersionEP(),
                       _ttl);
    Status s = kvstore->setKV(metaRk, metaRv, txn);
    if (!s.ok()) {
      return s;
    }
    return {ErrorCodes::ERR_OK, "OK"};
  }
};

class ListDeserializer : public Deserializer {
  Expected<std::vector<std::string>> deserializeZiplist(
    const std::string& payload, size_t* pos) {
    uint32_t zlbytes(0), zltail(0);
    uint16_t zllen(0);
    std::vector<std::string> zl;
    INVARIANT(easyCopy(&zlbytes, payload, pos) == 4 &&
              easyCopy(&zltail, payload, pos) == 4 &&
              easyCopy(&zllen, payload, pos) == 2);

    zl.reserve(zllen);
    uint32_t prevlen(0);
    while (zllen--) {
      if (prevlen >= 254) {
        *pos += 5;
      } else {
        *pos += 1;
      }
      std::string val;
      const unsigned char& encoding = payload.at(*(pos));
      if (static_cast<uint8_t>(encoding) < ZIP_STR_MASK) {
        val = loadString(payload, pos);
      } else {
        (*pos)++;
        switch (encoding) {
          case ZIP_INT_8B: {
            int8_t intEntry = 0;
            easyCopy(&intEntry, payload, pos);
            val = std::to_string(intEntry);
            break;
          }
          case ZIP_INT_16B: {
            int16_t intEntry;
            easyCopy(&intEntry, payload, pos);
            val = std::to_string(intEntry);
            break;
          }
          case ZIP_INT_24B: {
            uint8_t data[3];
            int32_t intEntry;
            for (int i = 0; i < 3; i++) {
              easyCopy(&data[i], payload, pos);
            }
            // Read sequence: low addr -> high addr, so data[2] gets
            // left shift
            intEntry = (data[2] << 16) | (data[1] << 8) | (data[0]);
            val = std::to_string(intEntry);
            break;
          }
          case ZIP_INT_32B: {
            int32_t intEntry;
            easyCopy(&intEntry, payload, pos);
            val = std::to_string(intEntry);
            break;
          }
          case ZIP_INT_64B: {
            int64_t intEntry;
            easyCopy(&intEntry, payload, pos);
            val = std::to_string(intEntry);
            break;
          }
          default: {
            if (encoding >= ZIP_INT_IMM_MIN && encoding <= ZIP_INT_IMM_MAX) {
              uint8_t intEntry;
              intEntry = (encoding & ZIP_INT_IMM_MASK) - 1;
              val = std::to_string(intEntry);
            } else {
              LOG(INFO) << "Invalid integer encoding "
                        << static_cast<uint8_t>(encoding);
              return {ErrorCodes::ERR_PARSEPKT, "Invalid integer encoding"};
            }
            break;
          }
        }
      }
      prevlen = val.size();
      zl.push_back(std::move(val));
    }
    uint8_t zlend(0);
    INVARIANT(easyCopy(&zlend, payload, pos) == 1);
    INVARIANT(zlend == 0xff);
    return zl;
  }

 public:
  explicit ListDeserializer(Session* sess,
                            const std::string& payload,
                            const std::string& key,
                            const uint64_t ttl)
    : Deserializer(sess, payload, key, ttl) {}

  virtual Status restore(Transaction* txn) {
    auto qlExpLen = loadLen(_payload, &_pos);
    if (!qlExpLen.ok()) {
      return qlExpLen.status();
    }

    size_t qlLen = qlExpLen.value();
    auto server = _sess->getServerEntry();
    auto expdb = server->getSegmentMgr()->getDbHasLocked(_sess, _key);
    if (!expdb.ok()) {
      return expdb.status();
    }
    RecordKey metaRk(expdb.value().chunkId,
                     _sess->getCtx()->getDbId(),
                     RecordType::RT_LIST_META,
                     _key,
                     "");
    PStore kvstore = expdb.value().store;
    ListMetaValue lm(INITSEQ, INITSEQ);

    uint64_t head = lm.getHead();
    uint64_t tail = lm.getTail();
    while (qlLen--) {
      auto zlist = loadString(_payload, &_pos);
      size_t pos = 0;
      auto expZl = deserializeZiplist(zlist, &pos);
      if (!expZl.ok()) {
        LOG(ERROR) << "Restore list failed, " << expZl.status().toString();
        return expZl.status();
      }
      const auto& zl = expZl.value();
      uint64_t idx;
      for (auto iter = zl.begin(); iter != zl.end(); iter++) {
        idx = tail++;
        RecordKey rk(metaRk.getChunkId(),
                     metaRk.getDbId(),
                     RecordType::RT_LIST_ELE,
                     metaRk.getPrimaryKey(),
                     std::to_string(idx));
        RecordValue rv(std::move(*iter), RecordType::RT_LIST_ELE, -1);
        Status s = kvstore->setKV(rk, rv, txn);
        if (!s.ok()) {
          return s;
        }
      }
    }
    lm.setHead(head);
    lm.setTail(tail);
    RecordValue metaRv(lm.encode(),
                       RecordType::RT_LIST_META,
                       _sess->getCtx()->getVersionEP(),
                       _ttl);
    Status s = kvstore->setKV(metaRk, metaRv, txn);
    if (!s.ok()) {
      return s;
    }
    return {ErrorCodes::ERR_OK, "OK"};
  }
};

Expected<std::unique_ptr<Deserializer>> getDeserializer(
  Session* sess,
  const std::string& payload,
  const std::string& key,
  const uint64_t ttl) {
  Expected<DumpType> expType = Deserializer::loadObjectType(payload, 0);
  if (!expType.ok()) {
    return expType.status();
  }
  std::unique_ptr<Deserializer> ptr;
  DumpType type = std::move(expType.value());
  switch (type) {
    case DumpType::RDB_TYPE_STRING:
      ptr = std::move(std::unique_ptr<Deserializer>(
        new KvDeserializer(sess, payload, key, ttl)));
      break;
    case DumpType::RDB_TYPE_SET:
      ptr = std::move(std::unique_ptr<Deserializer>(
        new SetDeserializer(sess, payload, key, ttl)));
      break;
    case DumpType::RDB_TYPE_ZSET:
      ptr = std::move(std::unique_ptr<Deserializer>(
        new ZsetDeserializer(sess, payload, key, ttl)));
      break;
    case DumpType::RDB_TYPE_HASH:
      ptr = std::move(std::unique_ptr<Deserializer>(
        new HashDeserializer(sess, payload, key, ttl)));
      break;
    case DumpType::RDB_TYPE_QUICKLIST:
      ptr = std::move(std::unique_ptr<Deserializer>(
        new ListDeserializer(sess, payload, key, ttl)));
      break;
    default:
      return {ErrorCodes::ERR_INTERNAL, "Not implemented"};
  }
  return std::move(ptr);
}

class RestoreMetaCommand : public Command {
 public:
  RestoreMetaCommand() : Command("restoremeta", "rs") {}

  ssize_t arity() const {
    return -2;
  }

  int32_t firstkey() const {
    return 0;
  }

  int32_t lastkey() const {
    return 0;
  }

  int32_t keystep() const {
    return 0;
  }

  Expected<std::string> run(Session* sess) final {
    auto eSlotId = tendisplus::stoul(sess->getArgs()[1]);
    if (!eSlotId.ok()) {
      LOG(ERROR) << "read slot id failed for " << eSlotId.status().toString();
      return eSlotId.status();
    }

    auto slotId = static_cast<uint32_t>(eSlotId.value());
    auto server = sess->getServerEntry();
    // TODO(VINCHEN): should use interface
    auto dbId = slotId % (server->getKVStoreCount());
    bool getBigKey = false;
    uint64_t valueSize = -1;
    uint64_t eleCnt = -1;

    if (sess->getArgs().size() > 2) {
      if (sess->getArgs().size() != 5) {
        return {ErrorCodes::ERR_PARSEOPT, "invalid args size"};
      }
      auto arg2 = toLower(sess->getArgs()[2]);
      if (arg2 != "bigkeys") {
        return {ErrorCodes::ERR_PARSEOPT, "arg2 is not bigkeys"};
      }
      getBigKey = true;

      auto arg3 = tendisplus::stoul(sess->getArgs()[3]);
      if (!arg3.ok()) {
        return arg3.status();
      }

      valueSize = arg3.value();

      auto arg4 = tendisplus::stoul(sess->getArgs()[4]);
      if (!arg4.ok()) {
        return arg4.status();
      }

      eleCnt = arg4.value();
    }

    auto expdb =
      server->getSegmentMgr()->getDb(sess, dbId, mgl::LockMode::LOCK_IS);
    if (!expdb.ok()) {
      return expdb.status();
    }

    PStore kvstore = expdb.value().store;
    auto ptxn = sess->getCtx()->createTransaction(kvstore);
    if (!ptxn.ok()) {
      return ptxn.status();
    }

    auto cursor = ptxn.value()->createDataCursor();

    RecordKey tmplRk(slotId, 0, RecordType::RT_DATA_META, "", "");
    auto prefix = tmplRk.prefixSlotType();
    cursor->seek(prefix);

    std::stringstream ss;
    Command::fmtMultiBulkLen(ss, 1);
    Command::fmtBulk(ss, "RESTOREMETASLOT");
    auto s = sess->setResponse(ss.str());
    if (!s.ok()) {
      return s;
    }
    ss.str(std::string());
    std::vector<byte> bulkBuf;
    while (true) {
      Expected<Record> expRcd = cursor->next();
      if (expRcd.status().code() == ErrorCodes::ERR_EXHAUST) {
        break;
      }
      if (!expRcd.ok()) {
        return expRcd.status();
      }
      const auto& key = expRcd.value().getRecordKey();
      const auto& value = expRcd.value().getRecordValue();
      if (key.getChunkId() != slotId) {
        // means we reach the end of a slot.
        break;
      }

      const auto& valueType = value.getRecordType();
      if (!isDataMetaType(valueType)) {
        // NOTE(vinchen): Because RecordType::RT_DATA_META is first elements in
        // one slot
        INVARIANT_D(key.getRecordType() != RecordType::RT_DATA_META);
        break;
      }

      INVARIANT_D(key.getRecordType() == RecordType::RT_DATA_META);

      std::vector<byte> buf;
      size_t pos(0);
      Serializer::saveString(&buf, &pos, key.getPrimaryKey());
      buf.reserve(buf.size() + 17);
      easyCopy(&buf, &pos, value.getVersionEP());
      easyCopy(&buf, &pos, value.getTtl());
      uint8_t typeMask(0);
      switch (valueType) {
        case RecordType::RT_HASH_META:
          typeMask = 4 << 4;
          break;
        case RecordType::RT_LIST_META:
          typeMask = 1 << 4;
          break;
        case RecordType::RT_SET_META:
          typeMask = 2 << 4;
          break;
        case RecordType::RT_ZSET_META:
          typeMask = 3 << 4;
          break;
        case RecordType::RT_KV:
          typeMask = 0 << 4;
          break;
        default:
          LOG(ERROR) << "get invalid record type"
                     << rt2Char(value.getRecordType()) << "in iteration";
          INVARIANT(0);
      }
      INVARIANT_D((typeMask & 0x01) == 0);
      if (getBigKey) {
        if (value.isBigKey(valueSize, eleCnt)) {
          // mean is is a big key
          typeMask |= 0x01;
        }
      }

      easyCopy(&buf, &pos, typeMask);
      if (bulkBuf.size() + buf.size() > 8 * 1024) {
        ss.str(std::string());
        Command::fmtMultiBulkLen(ss, 1);
        Command::fmtBulk(ss, std::string(bulkBuf.begin(), bulkBuf.end()));
        s = sess->setResponse(ss.str());
        if (!s.ok()) {
          return s;
        }
        bulkBuf.clear();
      }
      bulkBuf.insert(bulkBuf.end(), buf.begin(), buf.end());
    }
    ss.str(std::string());
    Command::fmtMultiBulkLen(ss, 1);
    Command::fmtBulk(ss, std::string(bulkBuf.begin(), bulkBuf.end()));
    Command::fmtMultiBulkLen(ss, 1);
    Command::fmtBulk(ss, "RESTOREMETAEND");
    return ss.str();
  }
} restoremetaCommand;

Expected<std::string> recordList2Aof(const std::list<Record>& list) {
  if (list.size() == 0) {
    return std::string("");
  }

  std::stringstream ss;
  auto type = list.front().getRecordValue().getRecordType();
  auto key = list.front().getRecordKey().getPrimaryKey();
  switch (type) {
    case tendisplus::RecordType::RT_KV:
      INVARIANT_D(list.size() == 1);
      Command::fmtMultiBulkLen(ss, 2 + list.size());
      Command::fmtBulk(ss, "SET");
      Command::fmtBulk(ss, key);
      break;

    case tendisplus::RecordType::RT_LIST_ELE:
      Command::fmtMultiBulkLen(ss, 2 + list.size());
      Command::fmtBulk(ss, "RPUSH");
      Command::fmtBulk(ss, key);
      break;

    case tendisplus::RecordType::RT_HASH_ELE:
      Command::fmtMultiBulkLen(ss, 2 + list.size() * 2);
      Command::fmtBulk(ss, "HMSET");
      Command::fmtBulk(ss, key);
      break;

    case tendisplus::RecordType::RT_SET_ELE:
      Command::fmtMultiBulkLen(ss, 2 + list.size());
      Command::fmtBulk(ss, "SADD");
      Command::fmtBulk(ss, key);
      break;

    case tendisplus::RecordType::RT_ZSET_H_ELE:
      Command::fmtMultiBulkLen(ss, 2 + list.size() * 2);
      Command::fmtBulk(ss, "ZADD");
      Command::fmtBulk(ss, key);
      break;

    default:
      INVARIANT_D(0);
      break;
  }

  for (const auto& rt : list) {
    // key and type of all records in the list is the same
    INVARIANT_D(rt.getRecordValue().getRecordType() == type);
    INVARIANT_D(rt.getRecordKey().getPrimaryKey() == key);

    const auto& rtKey = rt.getRecordKey();
    const auto& rtValue = rt.getRecordValue();

    switch (type) {
      case tendisplus::RecordType::RT_KV:
        Command::fmtBulk(ss, rtValue.getValue());
        break;

      case tendisplus::RecordType::RT_LIST_ELE:
        Command::fmtBulk(ss, rtValue.getValue());
        break;

      case tendisplus::RecordType::RT_HASH_ELE:
        Command::fmtBulk(ss, rtKey.getSecondaryKey());
        Command::fmtBulk(ss, rtValue.getValue());
        break;

      case tendisplus::RecordType::RT_SET_ELE:
        Command::fmtBulk(ss, rtKey.getSecondaryKey());
        break;

      case tendisplus::RecordType::RT_ZSET_H_ELE: {
        Expected<double> oldScore =
          ::tendisplus::doubleDecode(rtValue.getValue());
        if (!oldScore.ok()) {
          return oldScore.status();
        }
        Command::fmtBulk(ss, ::tendisplus::dtos(oldScore.value()));

        Command::fmtBulk(ss, rtKey.getSecondaryKey());
        break;
      }

      default:
        INVARIANT_D(0);
        break;
    }
  }

  return ss.str();
}

Expected<std::string> key2Aof(Session* sess, const std::string& key) {
  auto dbid = sess->getCtx()->getDbId();
  auto server = sess->getServerEntry();
  auto expdb = server->getSegmentMgr()->getDbWithKeyLock(
    sess, key, mgl::LockMode::LOCK_NONE);
  if (!expdb.ok()) {
    return expdb.status();
  }

  auto kvstore = expdb.value().store;
  auto ptxn = sess->getCtx()->createTransaction(kvstore);
  if (!ptxn.ok()) {
    return ptxn.status();
  }

  RecordKey mk(expdb.value().chunkId, dbid, RecordType::RT_DATA_META, key, "");
  auto eValue = kvstore->getKV(mk, ptxn.value());
  RET_IF_ERR_EXPECTED(eValue);

  auto type = eValue.value().getEleType();

  RecordKey fakeEle(expdb.value().chunkId, dbid, type, key, "");
  std::string prefix = fakeEle.prefixPk();
  auto cursor = ptxn.value()->createDataCursor();
  cursor->seek(prefix);

  std::list<Record> result;
  uint64_t count = 0;
  while (true) {
    Expected<Record> exptRcd = cursor->next();
    if (exptRcd.status().code() == ErrorCodes::ERR_EXHAUST) {
      break;
    }
    if (!exptRcd.ok()) {
      return exptRcd.status();
    }
    Record& rcd = exptRcd.value();
    const RecordKey& rcdKey = rcd.getRecordKey();
    if (rcdKey.prefixPk() != prefix) {
      break;
    }
    count++;
    result.emplace_back(std::move(rcd));
  }

  INVARIANT_D(result.size() > 0);
  auto ret = recordList2Aof(result);
  if (!ret.ok()) {
    return ret.status();
  }

  auto ttl = eValue.value().getTtl();
  if (ttl == 0) {
    // no expire
    return ret.value();
  }

  std::stringstream ss;
  ss << ret.value();

  /* pexpireat */
  std::stringstream ss1;
  Command::fmtMultiBulkLen(ss1, 3);
  Command::fmtBulk(ss1, "PEXPIREAT");
  Command::fmtBulk(ss1, key);
  Command::fmtBulk(ss1, std::to_string(ttl));
  ss << ss1.str();

  return ss.str();
}

class RestoreValueCommand : public Command {
 public:
  RestoreValueCommand() : Command("restorevalue", "rs") {}

  ssize_t arity() const {
    return 2;
  }

  int32_t firstkey() const {
    return 1;
  }

  int32_t lastkey() const {
    return 1;
  }

  int32_t keystep() const {
    return 0;
  }

  Expected<std::string> run(Session* sess) final {
    auto& key = (sess->getArgs()[1]);
    auto server = sess->getServerEntry();
    auto expdb = server->getSegmentMgr()->getDbWithKeyLock(sess, key, RdLock());
    RET_IF_ERR_EXPECTED(expdb);
    auto slotId = expdb.value().chunkId;

    SessionCtx* pCtx = sess->getCtx();
    INVARIANT(pCtx != nullptr);

    Expected<RecordValue> rv =
      Command::expireKeyIfNeeded(sess, key, RecordType::RT_DATA_META);
    RET_IF_ERR_EXPECTED(rv);

    PStore kvstore = expdb.value().store;
    auto ptxn = sess->getCtx()->createTransaction(kvstore);
    RET_IF_ERR_EXPECTED(ptxn);

    RecordKey fakeEle(
      slotId, pCtx->getDbId(), rv.value().getEleType(), key, "");
    std::string prefix = fakeEle.prefixPk();
    auto cursor = ptxn.value()->createDataCursor();
    cursor->seek(prefix);

    /* 1.restorevalue_begin  */
    std::stringstream ss;
    Command::fmtMultiBulkLen(ss, 2);
    Command::fmtBulk(ss, "RESTOREVALUE_BEGIN");
    Command::fmtBulk(ss, key);
    auto s = sess->setResponse(ss.str());
    RET_IF_ERR(s);

    ss.str(std::string());

    /* 2. set/hmset/sadd/rpush/zadd *n */
    std::list<Record> result;
    uint64_t count = 0;
    while (true) {
      Expected<Record> exptRcd = cursor->next();
      if (exptRcd.status().code() == ErrorCodes::ERR_EXHAUST) {
        break;
      }
      RET_IF_ERR_EXPECTED(exptRcd);

      Record& rcd = exptRcd.value();
      const RecordKey& rcdKey = rcd.getRecordKey();
      if (rcdKey.prefixPk() != prefix) {
        break;
      }
      count++;
      result.emplace_back(std::move(rcd));
      if (result.size() >= 1000) {
        auto eAof = recordList2Aof(result);
        RET_IF_ERR_EXPECTED(eAof);

        auto s = sess->setResponse(eAof.value());
        RET_IF_ERR(s);

        result.clear();
      }
    }
    INVARIANT_D(count == rv.value().getEleCnt());

    if (result.size() > 0) {
      auto eAof = recordList2Aof(result);
      RET_IF_ERR_EXPECTED(eAof);

      s = sess->setResponse(eAof.value());
      RET_IF_ERR(s);

      result.clear();
    }

    /* 3.pexpireat */
    auto ttl = rv.value().getTtl();
    if (ttl > 0) {
      ss.str(std::string());
      Command::fmtMultiBulkLen(ss, 3);
      Command::fmtBulk(ss, "PEXPIREAT");
      Command::fmtBulk(ss, key);
      Command::fmtBulk(ss, std::to_string(ttl));
      s = sess->setResponse(ss.str());
      RET_IF_ERR(s);
    }

    /* 4.restorevalue_end */
    auto reversion = rv.value().getVersionEP();
    ss.str(std::string());
    Command::fmtMultiBulkLen(ss, 4);
    Command::fmtBulk(ss, "RESTOREVALUE_END");
    Command::fmtBulk(ss, key);
    Command::fmtBulk(ss, std::to_string(reversion));
    Command::fmtBulk(ss, std::to_string(ttl));
    return ss.str();
  }
} restorevalueCommand;

class IncrMetaCommand : public Command {
 public:
  IncrMetaCommand() : Command("incrmeta", "rs") {}

  ssize_t arity() const {
    return 3;
  }

  int32_t firstkey() const {
    return 0;
  }

  int32_t lastkey() const {
    return 0;
  }

  int32_t keystep() const {
    return 0;
  }

  Expected<std::unordered_set<IncrMeta, IncrMetaHash>> serializeKvStoreBinlog(
    Transaction* txn, uint64_t startPos, uint64_t startRevision) {
    std::unordered_set<IncrMeta, IncrMetaHash> incrKeys;

    // scan binlog from startPos
    std::unique_ptr<RepllogCursorV2> cursor =
      txn->createRepllogCursorV2(startPos);
    while (true) {
      Expected<ReplLogRawV2> explog = cursor->next();
      if (explog.ok()) {
        auto logKey = explog.value().getReplLogKey();
        auto logValue = explog.value().getReplLogValue();
        auto key = ReplLogKeyV2::decode(logKey);
        if (!key.ok()) {
          LOG(ERROR) << "ReplLogKeyV2::decode failed:"
                     << key.status().toString();
          return key.status();
        }
        auto value = ReplLogValueV2::decode(logValue);
        if (!value.ok()) {
          return value.status();
        }

        // parse all ReplLogValueEntryV2 && add keys to incrKeys
        size_t offset = value.value().getHdrSize();
        auto data = value.value().getData();
        size_t dataSize = value.value().getDataSize();
        while (offset < dataSize) {
          size_t size = 0;
          auto entry = ReplLogValueEntryV2::decode(
            (const char*)data + offset, dataSize - offset, &size);
          if (!entry.ok()) {
            return entry.status();
          }
          offset += size;
          auto eMeta = parseReplLogValueEntryV2(entry.value(), startRevision);
          if (eMeta.ok()) {
            auto meta = eMeta.value();
            auto search = incrKeys.find(meta);
            // if keys appear multi times, use the max version one
            if (search != incrKeys.end()) {
              if (search->_version < meta._version ||
                  meta._op == ReplOp::REPL_OP_DEL) {
                incrKeys.erase(search);
              }
            }
            incrKeys.insert(std::move(eMeta.value()));
          } else {
            // no meta
            continue;
          }
        }

        if (offset != dataSize) {
          return {ErrorCodes::ERR_INTERNAL, "bad binlog"};
        }
      } else if (explog.status().code() == ErrorCodes::ERR_EXHAUST) {
        // no more data
        break;
      }
    }

    return incrKeys;
  }

  // parse ReplLogValueEntryV2 key smaller than startRevision will filterd
  Expected<IncrMeta> parseReplLogValueEntryV2(const ReplLogValueEntryV2& entry,
                                              uint64_t startRevision) {
    switch (entry.getOp()) {
      case ReplOp::REPL_OP_SET: {
        Expected<RecordKey> opkey = RecordKey::decode(entry.getOpKey());
        if (!opkey.ok()) {
          return opkey.status();
        }

        auto key = opkey.value();
        if (key.getRecordType() == RecordType::RT_DATA_META) {
          Expected<RecordValue> opvalue =
            RecordValue::decode(entry.getOpValue());
          if (!opvalue.ok()) {
            return opvalue.status();
          }
          const auto& value = opvalue.value();
          uint64_t version = value.getVersionEP();
          uint64_t ttl = value.getTtl();
          uint8_t type = decodeType(value.getRecordType());
          // key's revision smaller than startRevision skip it
          if (version > startRevision) {
            IncrMeta im(key.getPrimaryKey(),
                        ReplOp::REPL_OP_SET,
                        type,
                        entry.getTimestamp(),
                        ttl,
                        version);
            return im;
          }
        }

        break;
      }
      case ReplOp::REPL_OP_DEL: {
        Expected<RecordKey> opkey = RecordKey::decode(entry.getOpKey());
        if (!opkey.ok()) {
          return opkey.status();
        }

        auto key = opkey.value();
        // delete keys type is RT_DATA_META, we cann't get it's type, so set
        uint8_t type = decodeType(RecordType::RT_SET_META);
        if (key.getRecordType() == RecordType::RT_DATA_META) {
          // TODO(jingjunli): key's revision smaller than startRevision skip
          // it delete keys 's ttl && version is 0
          IncrMeta im(key.getPrimaryKey(),
                      ReplOp::REPL_OP_DEL,
                      type,
                      entry.getTimestamp(),
                      0,
                      0);
          return im;
        }
        break;
      }
      case ReplOp::REPL_OP_STMT: {
        break;
      }
      case ReplOp::REPL_OP_SPEC: {
        break;
      }
      case ReplOp::REPL_OP_DEL_RANGE: {
        break;
      }
      default:
        INVARIANT_D(0);
        return {ErrorCodes::ERR_DECODE, "not a valid binlog"};
    }

    return {ErrorCodes::ERR_EXHAUST, "end not has data"};
  }

  // incrmeta startRevision endRevision
  Expected<std::string> run(Session* sess) final {
    const auto& args = sess->getArgs();

    if (args.size() < 3) {
      return {ErrorCodes::ERR_PARSEPKT, "invalid set params"};
    }

    auto eStartRevision = tendisplus::stoul(args[1]);
    auto eEndRevision = tendisplus::stoul(args[2]);
    if (!eStartRevision.ok() || !eEndRevision.ok()) {
      auto errStatus =
        eStartRevision.ok() ? eEndRevision.status() : eStartRevision.status();
      LOG(ERROR) << "param is not integer or out of range start: "
                 << errStatus.toString();
      return errStatus;
    }

    auto startRevision = static_cast<uint64_t>(eStartRevision.value());
    auto endRevision = static_cast<uint64_t>(eEndRevision.value());
    uint64_t diffPos = endRevision - startRevision;
    auto server = sess->getServerEntry();
    std::stringstream ss;

    Command::fmtMultiBulkLen(ss, 1);
    Command::fmtBulk(ss, "INCRMETASTART");
    auto s = sess->setResponse(ss.str());
    if (!s.ok()) {
      return s;
    }
    ss.str(std::string());

    // payload buf
    std::vector<byte> bulkBuf;
    for (uint32_t i = 0; i < server->getKVStoreCount(); ++i) {
      auto expdb =
        server->getSegmentMgr()->getDb(sess, i, mgl::LockMode::LOCK_IX);
      if (!expdb.ok()) {
        return expdb.status();
      }

      PStore kvstore = expdb.value().store;
      auto ptxn = sess->getCtx()->createTransaction(kvstore);
      if (!ptxn.ok()) {
        return ptxn.status();
      }

      auto maxBinlogSeq = kvstore->getNextBinlogSeq();
      uint64_t startBinlogPos =
        maxBinlogSeq > diffPos ? maxBinlogSeq - diffPos : 0;
      auto incrKeys =
        serializeKvStoreBinlog(ptxn.value(), startBinlogPos, startRevision);
      if (!incrKeys.ok()) {
        return incrKeys.status();
      }

      for (auto key : incrKeys.value()) {
        std::vector<byte> buf;
        size_t pos(0);
        // encode : key version ttl keyType opType(incr/del)
        Serializer::saveString(&buf, &pos, key._key);
        easyCopy(&buf, &pos, key._version);
        easyCopy(&buf, &pos, key._ttl);
        easyCopy(&buf, &pos, key._type);
        easyCopy(&buf, &pos, key._op);
#ifdef TENDIS_DEBUG
        LOG(INFO) << "Add primary key:"
                  << "key: " << key._key << " version:" << key._version
                  << "type:" << static_cast<int>((key._type >> 4) & 0x0f)
                  << " op:" << static_cast<int>(key._op);
#endif
        if (bulkBuf.size() + buf.size() > 8 * 1024) {
          ss.str(std::string());
          Command::fmtMultiBulkLen(ss, 1);
          Command::fmtBulk(ss, std::string(bulkBuf.begin(), bulkBuf.end()));
          auto s = sess->setResponse(ss.str());
          if (!s.ok()) {
            return s;
          }
          bulkBuf.clear();
        }
        bulkBuf.insert(bulkBuf.end(), buf.begin(), buf.end());
      }
    }

    ss.str(std::string());
    Command::fmtMultiBulkLen(ss, 1);
    Command::fmtBulk(ss, std::string(bulkBuf.begin(), bulkBuf.end()));
    Command::fmtMultiBulkLen(ss, 1);
    Command::fmtBulk(ss, "INCRMETAEND");
    return ss.str();
  }
} incrMetaCommand;

}  // namespace tendisplus
